离线处理和实时处理 您所在的位置:网站首页 实时 离线 离线处理和实时处理

离线处理和实时处理

#离线处理和实时处理| 来源: 网络整理| 查看: 265

大数据实战项目(1)-项目简介、开发技术、工具、架构等 大数据实战项目(2)-数据采集、处理、分发流程所涉及到的框架及配置 这一部分主要是对数据进行离线处理和实时处理的总结。

离线数据处理 MySQL+Hive

MySQL一方面用来存储Hive的元数据,另一方面存储离线分析的结果。

1)MySQL的安装

2)Hive的安装

#hive-log4j.properties #日志目录需要提前创建 property.hive.log.dir = /opt/modules/hive-2.1.0/logs #修改hive-env.sh配置文件 #Set HADOOP_HOME to point to a specific hadoop install directory HADOOP_HOME=/opt/modules/hadoop-2.6.0 HBASE_HOME=/opt/modules/hbase-1.0.0-cdh5.4.0 # Hive Configuration Directory can be controlled by: export HIVE_CONF_DIR=/opt/modules/hive-2.1.0/conf # Folder containing extra ibraries required for hive compilation/execution can be controlled by: export HIVE_AUX_JARS_PATH=/opt/modules/hive-2.1.0/lib

3)Hive与MySQL集成

创建hive-site.xml文件,配置mysql元数据库metastore javax.jdo.option.ConnectionURL jdbc:mysql://bigdata-pro01.bigDAta.com/metastore?createDatabaseIfNotExist=true javax.jdo.option.ConnectionDriverName com.mysql.jdbc.Driver javax.jdo.option.ConnectionUserName root javax.jdo.option.ConnectionPassword 123456 hive.cli.print.header true hive.cli.print.current.db true hbase.zookeeper.quorum bigdata-pro01.bigDAta.com,bigdata-pro02.bigDAta.com,bigdata-pro03.bigDAta.com 在MySQL数据中设置用户连接信息,可以无阻碍访问mysql数据库,其次要保证Hive所在节点能无密钥登录其他集群内节点

4)Hive与MySQL集成测试

启动HDFS和YARN服务启动Hive通过Hive服务创建表,并向这个表中加载数据,在Hive查看表中内容在MySQL数据库metastore中查看元数据 Hive+HBase集成

Hive是一个数据仓库,主要是转为MapReduce完成对大量数据的离线分析和决策,之前完成了Flume集成HBase,此时HBase中能源源不断地插入数据,那么如何使Hive中也有数据呢?使用外部表进行Hive与HBase的关联

hbase.zookeeper.quorum bigdata-pro01.bigDAta.com,bigdata-pro02.bigDAta.com,bigdata-pro03.bigDAta.com

将 HBase中的部分jar包拷贝到Hive中,如果两者都是CDH版本,就不需要进行拷贝;若hive安装时自带了以下jar包,将其删除。使用软连接的方式

export HBASE_HOME=/opt/modules/hbase-1.0.0-cdh5.4.0 export HIVE_HOME=/opt/modules/hive-2.1.0 ln -s $HBASE_HOME/lib/hbase-server-1.0.0-cdh5.4.0.jar $HIVE_HOME/lib/hbase-server-1.0.0-cdh5.4.0.jar ln -s $HBASE_HOME/lib/hbase-client-1.0.0-cdh5.4.0.jar $HIVE_HOME/lib/hbase-client-1.0.0-cdh5.4.0.jar ln -s $HBASE_HOME/lib/hbase-protocol-1.0.0-cdh5.4.0.jar $HIVE_HOME/lib/hbase-protocol-1.0.0-cdh5.4.0.jar ln -s $HBASE_HOME/lib/hbase-it-1.0.0-cdh5.4.0.jar $HIVE_HOME/lib/hbase-it-1.0.0-cdh5.4.0.jar ln -s $HBASE_HOME/lib/htrace-core-3.0.4.jar $HIVE_HOME/lib/htrace-core-3.0.4.jar ln -s $HBASE_HOME/lib/hbase-hadoop2-compat-1.0.0-cdh5.4.0.jar $HIVE_HOME/lib/hbase-hadoop2-compat-1.0.0-cdh5.4.0.jar ln -s $HBASE_HOME/lib/hbase-hadoop-compat-1.0.0-cdh5.4.0.jar $HIVE_HOME/lib/hbase-hadoop-compat-1.0.0-cdh5.4.0.jar ln -s $HBASE_HOME/lib/high-scale-lib-1.1.1.jar $HIVE_HOME/lib/high-scale-lib-1.1.1.jar ln -s $HBASE_HOME/lib/hbase-common-1.0.0-cdh5.4.0.jar $HIVE_HOME/lib/hbase-common-1.0.0-cdh5.4.0.jar

在Hive中创建一个与HBase中的表建立关联的外部表

create external table weblogs( id string, datatime string, userid string, searchname string, retorder string, cliorder string, cliurl string ) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES("hbase.columns.mapping" = ":key,info:datatime,info:userid,info:searchname,info:retorder,info:cliorder,info:cliurl") TBLPROPERTIES("hbase.table.name" = "weblogs");

可通过在Hive与HBase中输入count ‘weblogs’,查看数据是否同步。

Cloudera Hue可视化分析

1)下载、安装及编译 详细过程,记录在Hadoop可视化神器-Hue安装、编译、运行 2)基本配置

1.配置desktop/conf/hue.ini 2.修改desktop.db文件权限

3)集成 具体内容参考:Hue集成HDFS、YARN、Hive、MySql、HBase的相关配置,此处仅是流程。

与HDFS集成 在这里插入图片描述

与YARN集成 在这里插入图片描述

与Hive集成

在这里插入图片描述

与MySQL集成

在这里插入图片描述

与HBase集成

在这里插入图片描述

实时数据处理 Spark与Kafka集成

1)Spark下载 安装与编译

2)Structured Streaming 与Kafka集成

将kafka_2.11-0.10.0.0.jar kafka-clients-0.10.0.0.jar spark-sql-kafka-0-10_2.11-2.2.0.jar spark-streaming-kafka-0-10_2.11-2.1.0.jar等包添加到spark下的jars目录下

在IDEA中编写如下代码,Structured Streaming从kafka中读取数据,并进行计算

val spark = SparkSession.builder() .master("local[2]") .appName("streaming").getOrCreate() val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "bigdata-pro01.bigDAta.com:9092,bigdata-pro02.bigDAta.com:9092,bigdata-pro03.bigDAta.com:9092") .option("subscribe", "weblogs") .load() import spark.implicits._ val lines = df.selectExpr("CAST(value AS STRING)").as[String] val weblog = lines.map(_.split(",")) .map(x => Weblog(x(0), x(1), x(2),x(3),x(4),x(5))) val titleCount = weblog .groupBy("searchname").count().toDF("titleName","count") Spark与MySQL集成

由于这里仅仅需要对报表进行展示,前台展示的字段并不多,MySQL完全可以支撑。在HBase中有几百万条数据( 一个浏览话题可能有十几万人搜索过,也就是说一个话题就有十几万条数据,这么大量数据当然要存在Hbase中 ),而经过Spark的计算, 这十几万条数据在mysql中就变成了一条数据(titleName,count)。

如果需要实时查询用户各种信息(数据量很大,字段很多),那么就需要实时的直接从Hbase里查,而不会在Mysql中。

val url ="jdbc:mysql://bigdata-pro01.bigDAta.com:3306/test" val username="root" val password="123456" val writer = new JDBCSink(url,username,password) val query = titleCount.writeStream .foreach(writer) .outputMode("update") .trigger(ProcessingTime("5 seconds")) .start() query.awaitTermination()

其中的JDBCSink具体代码如下所示:

import java.sql._ import org.apache.spark.sql.{ForeachWriter, Row} class JDBCSink(url:String, username:String,password:String) extends ForeachWriter[Row]{ //var是一个变量 //val常量 var statement : Statement =_ var resultSet : ResultSet =_ var connection : Connection=_ override def open(partitionId: Long, version: Long): Boolean = { connection = new MySqlPool(url,username,password).getJdbcConn(); statement = connection.createStatement() return true } //处理数据 override def process(value: Row): Unit = { // 将titleName中的[[]]用空格代替。标记一个中括号表达式的开始。要匹配 [,请使用 \[ val titleName = value.getAs[String]("titleName").replaceAll("[\\[\\]]","") val count = value.getAs[Long]("count"); val querySql = "select 1 from webCount " + "where titleName = '"+titleName+"'" val updateSql = "update webCount set " + "count = "+count+" where titleName = '"+titleName+"'" val insertSql = "insert into webCount(titleName,count)" + "values('"+titleName+"',"+count+")" try{ var resultSet = statement.executeQuery(querySql) if(resultSet.next()){ //如果有执行updateSql statement.executeUpdate(updateSql) }else{ //没有的话就执行insertSql statement.execute(insertSql) } }catch { case ex: SQLException => { println("SQLException") } case ex: Exception => { println("Exception") } case ex: RuntimeException => { println("RuntimeException") } case ex: Throwable => { println("Throwable") } } } override def close(errorOrNull: Throwable): Unit = { if(statement==null){ statement.close() } if(connection==null){ connection.close() } } }

而在JDBCSink中用到的MySqlPool连接池的具体代码如下所示

import java.sql.{Connection, DriverManager} import java.util class MySqlPool(url:String, user:String, pwd:String) extends Serializable{ private val max = 3 //连接池连接总数 private val connectionNum = 1 //每次产生连接数 private var conNum = 0 //当前连接池已产生的连接数 private val pool = new util.LinkedList[Connection]() //连接池 //获取连接 def getJdbcConn() : Connection = { //同步代码块 AnyRef.synchronized({ if(pool.isEmpty){ //加载驱动 preGetConn() for(i


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有